Social media is now an essential part of modern era and people have become used to share their thoughts and opinions on social media such as Twitter and LinkedIn. In this exploratory and preliminary study, we use the approximately 1 million tweets collected between Oct. 19th, 7:30pm and Oct. 20th, 1:30am to try and predict the presidential election outcome. Note that the debate started ~9:00pm on Oct. 19th. In addition to providing more insights on how the social media reacts to the presidential debate, this study also serves as an exploration on the performance of Word2Vec module in Apache Spark's spark.ml package.
The calculations were carried out on a homogeneous (Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz) stand-alone Spark cluster consisting of a master node (2 cpu, 4GB RAM) and two slave nodes (8 cpu, 6GB RAM), which are all running Scientific Linux 6 as the operating system. Since this minimal cluster was temporarily built for performing this study using the desktop computers in my office, no job scheduler, such as YARN and Mesos was used. The actual Spark application is this Jupyter Notebook, which can be considered as a PySpark shell.
In this study, tweets gathered directly via Twitter Streaming API are in JSON format and each tweet may have dramatically different fields. Hence flexibility on schema design is important in choosing the database for warehousing and serving the collected tweets. MongoDB is a NoSQL database, which provides flexibility on the schema. Although it is also highly scalable and secure via sharding and replica sets, in this study one single MongoDB instance is sufficient considering the data size and streaming bandwidth limit imposed by Twitter.
All the tweets are separated into two sections: before the 3rd presidential debate and in/post the debate. Technology wise, Python is used as the main programming language and standard Python tools, such as NumPy and Pandas, are used for data munging. Usually on twitter, people tend to use abbreviated words and phrases, which makes the text cleaning a challenge. So here NLTK was used for word stemming and stop-word removal. The tweets text was analyzed using Word2Vec NLP algorithm (skip-gram model) and KMeans clustering algorithm. Finally, the data visualization is carried out using plotly and ggplot. Note that both Word2Vec and KMeans were carried out using Apache Spark's spark.ml package. For querying the result and performing window aggregation, both its DataFrame API and standard SQL were used.
In this study, we only selected the original tweets as the sample, assuming that the distribution of political opinions within original tweets can correctly reflect that among all voters. We also only took the tweets in English for the simplicity of NLP process. The word vector length was set to be 100, which is a typical value for short documents. The number of clusers was set to be 5 here for this preliminary study.
As shown below, KMeans calculation converged after considering 100 "features" generated in Word2Vec process. By comparing individual word vectors and actual tweet text with the vectors of centroids, we give the description of the 5 categories for both pre-debate and in/post-debate as:
| Pre-Debate | In/Post-Debate | |
|---|---|---|
| Category 0 | Neutral: Ads/Reports | Neutral: Expressing frustration on this election |
| Category 1 | Neutral: Unbiased comments | Slightly Supporting Clinton |
| Category 2 | Slightly Supporting Trump | Against Trump |
| Category 3 | Neutral: Reports | Neutral: Reports |
| Category 4 | Slightly Supporting Clinton | Slightly Supporting Trump |
According to the visualization, the number of tweets supporting Hillary Clinton is more than that supporting Donald Trump. This means that this study predicts Hillary Clinton is going to win this presidential election. Interestingly the visualization also shows that as the time approached the start of the 3rd presidential debate, more and more people became "neutral" or "undecided". However, after the debate started, more people "made choice", meaning both Trump and Clinton gained more supporters but Clinton still had more supporters.
Although our NLP model (Word2Vec + KMeans) predicts Hillary Clinton as the winner of this election, the aggregation study of the most popular hashtags showed different outcome. As shown below, the number of tweets that have hashtags supporting Trump is larger than that of the tweets which has hashtags supporting Clinton. This difference indicates that our NLP model may not be accurate enough and it needs more hyperparameter tuning (e.g. word vector length, number of KMeans clusters, etc.) and more careful text cleaning. It is interesting that roughly our current model can differentiate neutral opinions versus slightly biased opinions, but more categories may need to be added considering people's political view are quite complex.
import pymongo
from pymongo import MongoClient
client = MongoClient('mongodb://152.3.169.27:27017/')
import numpy as np, scipy as sp
import pandas as pd
from pandas import DataFrame as df, Series as ss
import sklearn as sk
import ggplot, seaborn as sns
from pyspark.sql.types import *
from pyspark.sql.functions import *
%matplotlib inline
import random, string
import re
from bs4 import BeautifulSoup
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
ps = PorterStemmer()
tweets = client['db_third_debate']
pre_debate = tweets['pre_debate']
in_debate = tweets['in_debate']
pre_debate.find({"retweeted_status":{"$exists": False}}).count()
in_debate.find({"retweeted_status":{"$exists": False}}).count()
pre_debate_tweets = pd.DataFrame(list(pre_debate.find({"retweeted_status":{"$exists": False}},
projection={"_id": False, "text": True, "lang": True, "timestamp_ms": True, "user['location']": True, \
"retweet_count": True, "entities.hashtags.text": True})) )
pre_debate_tweets = pre_debate_tweets.dropna(how='any', subset=['text'])
in_debate_tweets = pd.DataFrame(list(in_debate.find({"retweeted_status":{"$exists": False}},
projection={"_id": False, "text": True, "lang": True, "timestamp_ms": True, "user['location']": True, \
"retweet_count": True, "entities.hashtags.text": True})) )
in_debate_tweets = in_debate_tweets.dropna(how='any', subset=['text'])
pre_debate_tweets['timestamp_ms'] = pre_debate_tweets['timestamp_ms'].astype(np.int)
pre_debate_tweets['retweet_count'] = pre_debate_tweets['retweet_count'].astype(np.int)
pre_debate_tweets['tweet'] = pre_debate_tweets['text']
in_debate_tweets['timestamp_ms'] = in_debate_tweets['timestamp_ms'].astype(np.int)
in_debate_tweets['retweet_count'] = in_debate_tweets['retweet_count'].astype(np.int)
in_debate_tweets['tweet'] = in_debate_tweets['text']
pre_sample = random.sample(range(0, pre_debate_tweets.shape[0]), 25000)
in_sample = random.sample(range(0, in_debate_tweets.shape[0]), 25000)
# pre_debate_tweets_sample = pre_debate_tweets.iloc[pre_sample].copy()
pre_debate_tweets_sample = pre_debate_tweets[pre_debate_tweets['lang']=="en"].copy()
pre_debate_tweets_sample.shape
# in_debate_tweets_sample = in_debate_tweets.iloc[in_sample].copy()
in_debate_tweets_sample = in_debate_tweets[in_debate_tweets['lang']=="en"].copy()
in_debate_tweets_sample.shape
class cleanTweets(object):
acc = 0
stops = set(stopwords.words("english"))
@classmethod
def clearText(self, textString):
'''
@param textString: String
'''
# Back to plain string
# s1 = str(textString.decode('unicode_escape').encode('ascii','ignore'))
if type(textString) == 'float':
textString = str(textString)
s1 = str(textString.encode('ascii', 'ignore'))
# Remove URLs
s1 = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', ' ', s1)
# Remove @...s
s1 = re.sub(r'@\w+\b', ' ', s1)
# Remove hashtags, we will check hash tags later from tweet fields
s1 = re.sub(r'#\w+\b', ' ', s1)
# Punctuations, special treatment for "U.S."
s1 = re.sub(r'U\.S\.', 'US', s1)
# s1 = re.sub(r'[\"\'\`\,\.\-\:\{\}\!\?\<\>\[\]]|\&\;|\\n', ' ', s1)
s1 = s1.translate(None, string.punctuation)
# Apply NLTK
# s1 = ' '.join(nltk.word_tokenize(s1)).strip()
s1 = nltk.word_tokenize(s1)
words = [ps.stem(w.lower()) for w in s1 if not w.lower() in self.stops ]
# lowerCaseWords = [w.lower() for w in words]
self.acc = self.acc + 1
# print self.acc
# return ' '.join(lowerCaseWords)
# return lowerCaseWords
return words
@classmethod
def collectHashTags(self, thisEntities):
j = []
for tag in thisEntities['hashtags']:
j.append(str(tag[u'text'].encode('ascii', 'ignore')))
self.acc = self.acc + 1
return j
cleanTweets.acc = 0
pre_debate_tweets_sample['text'] = pre_debate_tweets_sample['text'].apply(cleanTweets.clearText)
in_debate_tweets_sample['text'] = in_debate_tweets_sample['text'].apply(cleanTweets.clearText)
pre_debate_tweets_sample['entities'] = pre_debate_tweets_sample['entities'].apply(cleanTweets.collectHashTags)
in_debate_tweets_sample['entities'] = in_debate_tweets_sample['entities'].apply(cleanTweets.collectHashTags)
# spark.conf.set('spark.executor.memory', '6G')
sp_pre_debate_tweets = spark.createDataFrame(pre_debate_tweets_sample)
sp_pre_debate_tweets.cache()
sp_in_debate_tweets = spark.createDataFrame(in_debate_tweets_sample)
sp_in_debate_tweets.cache()
sp_pre_debate_tweets = sp_pre_debate_tweets.withColumnRenamed('entities', 'hashtags')
sp_in_debate_tweets = sp_in_debate_tweets.withColumnRenamed('entities', 'hashtags')
pyspark.ml.feature.Word2Vec and pyspark.ml.clustering.KMeans¶from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="text", outputCol="word_vector")
w2v_pre_model = word2Vec.fit(sp_pre_debate_tweets)
w2v_in_model = word2Vec.fit(sp_in_debate_tweets)
w2v_pre_result = w2v_pre_model.transform(sp_pre_debate_tweets)
w2v_pre_result.cache()
w2v_in_result = w2v_in_model.transform(sp_in_debate_tweets)
w2v_in_result.cache()
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="word_vector", predictionCol="prediction", k=5, seed=1023)
kmeans_pre_model = kmeans.fit(w2v_pre_result)
kmeans_in_model = kmeans.fit(w2v_in_result)
trans_w2v_pre_results = kmeans_pre_model.transform(w2v_pre_result)
trans_w2v_in_results = kmeans_in_model.transform(w2v_in_result)
trans_w2v_pre_results = trans_w2v_pre_results.withColumn('time',
from_unixtime(trans_w2v_pre_results['timestamp_ms']/1000)).sort('time')
trans_w2v_in_results = trans_w2v_in_results.withColumn('time',
from_unixtime(trans_w2v_in_results['timestamp_ms']/1000)).sort('time')
trans_w2v_pre_results = trans_w2v_pre_results.withColumn('grid',
window(trans_w2v_pre_results['time'], "20 minute"))
trans_w2v_in_results = trans_w2v_in_results.withColumn('grid',
window(trans_w2v_in_results['time'], "30 minute"))
trans_w2v_pre_results_sql = trans_w2v_pre_results.createOrReplaceTempView("pre_result")
trans_w2v_in_results_sql = trans_w2v_in_results.createOrReplaceTempView("in_result")
pre_result_window = spark.sql("""
SELECT prediction, COUNT(prediction) AS _count, grid.start
FROM pre_result
GROUP BY prediction, grid.start
ORDER BY prediction, grid.start
""")
pre_result_pdf = pre_result_window.toPandas()
in_result_window = spark.sql("""
SELECT prediction, count(prediction) AS _count, grid.start
FROM in_result
GROUP BY prediction, grid.start
ORDER BY prediction, grid.start
""")
in_result_pdf = in_result_window.toPandas()
plotly and cufflinks¶pre_time = list(pre_result_pdf['start'].unique())
in_time = list(in_result_pdf['start'].unique())
pre_count = []
in_count = []
for itime in pre_time:
pre_count.append(pre_result_pdf[pre_result_pdf['start']==itime]['_count'].sum())
for itime in in_time:
in_count.append(in_result_pdf[in_result_pdf['start']==itime]['_count'].sum())
pre_result_pdf['percentage'] = pre_result_pdf['_count']
in_result_pdf['percentage'] = in_result_pdf['_count']
for index, row in pre_result_pdf.iterrows():
pre_result_pdf.loc[index, 'percentage'] = float(row['_count']) / \
float(pre_count[pre_time.index(np.datetime64(row['start']))])
# print float(row['_count']) / float(pre_count[row['prediction']])
for index, row in in_result_pdf.iterrows():
in_result_pdf.loc[index, 'percentage'] = float(row['_count']) / \
float(in_count[in_time.index(np.datetime64(row['start']))])
import cufflinks as cf
print cf.__version__
import plotly.plotly as py
pre_result_pdf_pivot = pre_result_pdf.pivot('start', 'prediction')['percentage']
in_result_pdf_pivot = in_result_pdf.pivot('start', 'prediction')['percentage']
pre_result_pdf_pivot.index
cf.set_config_file(offline=True, world_readable=True, theme='ggplot')
pre_result_pdf_pivot.iplot(kind='barh',barmode='stack', bargap=.3, filename='cf_pre_debate')
cf.set_config_file(offline=True, world_readable=True, theme='ggplot')
in_result_pdf_pivot.iplot(kind='barh',barmode='stack', bargap=.3, filename='cf_in_debate')
for category in range(5):
print "# ===========> Category {0} <=========== #".format(category)
trans_w2v_pre_results.filter(trans_w2v_pre_results['prediction']==category).select('tweet').show(10, False)
for category in range(5):
print "# ===========> Category {0} <=========== #".format(category)
trans_w2v_in_results.filter(trans_w2v_in_results['prediction']==category).select('tweet').show(10, False)
pre_debate_words = []
for index, row in pre_debate_tweets_sample.iterrows():
pre_debate_words.append(row['text'])
pre_debate_words_flat = [val for sublist in pre_debate_words for val in sublist]
len(pre_debate_words_flat)
in_debate_words = []
for index, row in in_debate_tweets_sample.iterrows():
in_debate_words.append(row['text'])
in_debate_words_flat = [val for sublist in in_debate_words for val in sublist]
len(in_debate_words_flat)
pre_debate_words_df = spark.createDataFrame(pd.DataFrame(pre_debate_words_flat, columns=['word']))
pre_debate_words_window = pre_debate_words_df.createOrReplaceTempView("pre_debate_words_window")
spark.sql("""
SELECT word, COUNT(word) AS count
FROM pre_debate_words_window
GROUP BY word
ORDER BY count DESC
""").show(10)
in_debate_words_df = spark.createDataFrame(pd.DataFrame(in_debate_words_flat, columns=['word']))
in_debate_words_window = in_debate_words_df.createOrReplaceTempView("in_debate_words_window")
spark.sql("""
SELECT word, COUNT(word) AS count
FROM in_debate_words_window
GROUP BY word
ORDER BY count DESC
""").show(10)
pre_debate_tags = []
for index, row in pre_debate_tweets_sample.iterrows():
pre_debate_tags.append(row['entities'])
pre_debate_tags_flat = [val for sublist in pre_debate_tags for val in sublist]
in_debate_tags = []
for index, row in in_debate_tweets_sample.iterrows():
in_debate_tags.append(row['entities'])
in_debate_tags_flat = [val for sublist in in_debate_tags for val in sublist]
pre_debate_tags_df = spark.createDataFrame(pd.DataFrame(pre_debate_tags_flat, columns=['tags']))
pre_debate_tags_window = pre_debate_tags_df.createOrReplaceTempView("pre_debate_tags_window")
in_debate_tags_df = spark.createDataFrame(pd.DataFrame(in_debate_tags_flat, columns=['tags']))
in_debate_tags_window = in_debate_tags_df.createOrReplaceTempView("in_debate_tags_window")
spark.sql("""
SELECT tags, COUNT(tags) AS tag_count
FROM pre_debate_tags_window
GROUP BY tags
ORDER BY tag_count DESC
""").show(30)
print "pre-debate:\nTrump\t:\t{0}\nClinton\t:\t{1}".format(2594 + 1128 + 613 + 595 + 498 + 201 + 196 + 180 + 175,
1323 + 967 + 617 + 267 + 193)
spark.sql("""
SELECT tags, COUNT(tags) AS tag_count
FROM in_debate_tags_window
GROUP BY tags
ORDER BY tag_count DESC
""").show(30)
ggplot for offline view¶ggplot.ggplot(pre_result_pdf, ggplot.aes(x='start', weight='percentage', fill='factor(prediction)')) \
+ ggplot.geom_bar()
ggplot.ggplot(in_result_pdf, ggplot.aes(x='start', weight='percentage', fill='factor(prediction)')) \
+ ggplot.geom_bar()